---
title: prefect-sqlalchemy
---

# Welcome!

`prefect-sqlalchemy` helps you connect to a database in your Prefect flows. 

## Getting started

### Install `prefect-sqlalchemy`

The following command will install a version of `prefect-sqlalchemy` compatible with your installed version of `prefect`. 
If you don't already have `prefect` installed, it will install the newest version of `prefect` as well.

```bash
pip install "prefect[sqlalchemy]"
```

Upgrade to the latest versions of `prefect` and `prefect-sqlalchemy`:

```bash
pip install -U "prefect[sqlalchemy]"
```

### Register newly installed block types

Register the block types in the `prefect-sqlalchemy` module to make them available for use.


```bash
prefect block register -m prefect_sqlalchemy
```

## Examples

### Save credentials to a block

To use the `load` method on Blocks, you must have a block saved through code or saved through the UI.

```python
from prefect_sqlalchemy import SqlAlchemyConnector, ConnectionComponents, SyncDriver

connector = SqlAlchemyConnector(
    connection_info=ConnectionComponents(
        driver=SyncDriver.POSTGRESQL_PSYCOPG2,
        username="USERNAME-PLACEHOLDER",
        password="PASSWORD-PLACEHOLDER",
        host="localhost",
        port=5432,
        database="DATABASE-PLACEHOLDER",
    )
)

connector.save("BLOCK_NAME-PLACEHOLDER")
```

Load the saved block that holds your credentials:

```python
from prefect_sqlalchemy import SqlAlchemyConnector

SqlAlchemyConnector.load("BLOCK_NAME-PLACEHOLDER")
```

The required arguments depend upon the desired driver. For example, SQLite requires only the `driver` and `database` arguments:

```python
from prefect_sqlalchemy import SqlAlchemyConnector, ConnectionComponents, SyncDriver

connector = SqlAlchemyConnector(
    connection_info=ConnectionComponents(
        driver=SyncDriver.SQLITE_PYSQLITE,
        database="DATABASE-PLACEHOLDER.db"
    )
)

connector.save("BLOCK_NAME-PLACEHOLDER")
```

### Work with databases in a flow

To set up a table, use the `execute` and `execute_many` methods. 

Use the `fetch_many` method to retrieve data in a stream until there's no more data.

Use the `SqlAlchemyConnector` as a context manager, to ensure that the SQLAlchemy engine and any connected resources are closed properly after you're done with them.


<Note>

**Async support**

`SqlAlchemyConnector`  supports async workflows. Just be sure to save, load, and use an async driver, as in the example below.

```python
from prefect_sqlalchemy import SqlAlchemyConnector, ConnectionComponents, AsyncDriver

connector = SqlAlchemyConnector(
    connection_info=ConnectionComponents(
        driver=AsyncDriver.SQLITE_AIOSQLITE,
        database="DATABASE-PLACEHOLDER.db"
    )
)

if __name__ == "__main__":
    connector.save("BLOCK_NAME-PLACEHOLDER")
```
</Note>

<Tabs>
<Tab title="Sync">

```python
from prefect import flow, task
from prefect_sqlalchemy import SqlAlchemyConnector


@task
def setup_table(block_name: str) -> None:
    with SqlAlchemyConnector.load(block_name) as connector:
        connector.execute(
            "CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);"
        )
        connector.execute(
            "INSERT INTO customers (name, address) VALUES (:name, :address);",
            parameters={"name": "Marvin", "address": "Highway 42"},
        )
        connector.execute_many(
            "INSERT INTO customers (name, address) VALUES (:name, :address);",
            seq_of_parameters=[
                {"name": "Ford", "address": "Highway 42"},
                {"name": "Unknown", "address": "Highway 42"},
            ],
        )

@task
def fetch_data(block_name: str) -> list:
    all_rows = []
    with SqlAlchemyConnector.load(block_name) as connector:
        while True:
            # Repeated fetch* calls using the same operation will
            # skip re-executing and instead return the next set of results
            new_rows = connector.fetch_many("SELECT * FROM customers", size=2)
            if len(new_rows) == 0:
                break
            all_rows.append(new_rows)
    return all_rows

@flow
def sqlalchemy_flow(block_name: str) -> list:
    setup_table(block_name)
    all_rows = fetch_data(block_name)
    return all_rows


if __name__ == "__main__":
    sqlalchemy_flow("BLOCK-NAME-PLACEHOLDER")
```

</Tab>

<Tab title="Async">

```python
from prefect import flow, task
from prefect_sqlalchemy import SqlAlchemyConnector
import asyncio


@task
async def setup_table(block_name: str) -> None:
    async with await SqlAlchemyConnector.load(block_name) as connector:
        await connector.execute(
            "CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);"
        )
        await connector.execute(
            "INSERT INTO customers (name, address) VALUES (:name, :address);",
            parameters={"name": "Marvin", "address": "Highway 42"},
        )
        await connector.execute_many(
            "INSERT INTO customers (name, address) VALUES (:name, :address);",
            seq_of_parameters=[
                {"name": "Ford", "address": "Highway 42"},
                {"name": "Unknown", "address": "Highway 42"},
            ],
        )

@task
async def fetch_data(block_name: str) -> list:
    all_rows = []
    async with await SqlAlchemyConnector.load(block_name) as connector:
        while True:
            # Repeated fetch* calls using the same operation will
            # skip re-executing and instead return the next set of results
            new_rows = await connector.fetch_many("SELECT * FROM customers", size=2)
            if len(new_rows) == 0:
                break
            all_rows.append(new_rows)
    return all_rows

@flow
async def sqlalchemy_flow(block_name: str) -> list:
    await setup_table(block_name)
    all_rows = await fetch_data(block_name)
    return all_rows


if __name__ == "__main__":
    asyncio.run(sqlalchemy_flow("BLOCK-NAME-PLACEHOLDER"))
```

</Tab>
</Tabs>


## Resources

Refer to the `prefect-sqlalchemy` [SDK documentation](https://reference.prefect.io/prefect_sqlalchemy/) to explore all the capabilities of the `prefect-sqlalchemy` library.

For assistance using SQLAlchemy, consult the [SQLAlchemy documentation](https://www.sqlalchemy.org/).
